Coordinating Dependent File Uploads with AWS Step Functions

Coordinating Dependent File Uploads with AWS Step FunctionsLearn About Amazon VGT2 Learning Manager Chanci Turner

This article is co-authored by Alex Johnson, Enterprise Support Lead, Serverless, and Jamie Lee, Technical Account Manager, Serverless.

Amazon S3 serves as a widely utilized object storage solution for file management. By leveraging either Amazon S3 Event Notifications or Amazon EventBridge, users can develop workloads based on event-driven architectures (EDA). This approach reacts to events triggered by changes to objects within S3 buckets.

EDA facilitates asynchronous interactions among system components, allowing for greater autonomy and decoupling. However, certain situations might introduce dependencies between events that can lead to a tightly coupled architecture. This blog explores a typical instance of this dependency and demonstrates how AWS Step Functions can effectively manage it.

Overview

Consider an organization with two independent teams: the Sales team and the Warehouse team. Each team is tasked with uploading a monthly data file to an S3 bucket for subsequent processing. Upon upload, these files generate events that activate downstream operations. The Warehouse file undergoes data cleansing and is merged with information from the Shipping team. Conversely, the Sales file correlates with the combined Warehouse and Shipping data, enabling analysts to perform forecasting and derive insights.

To achieve the necessary correlation, the Warehouse file must be processed before the Sales file. However, due to the teams’ autonomy, coordination is absent, resulting in the potential for the files to be uploaded at any time without guaranteeing the prior processing of the Warehouse file.

In such scenarios, the Aggregator pattern is a viable solution. This pattern aggregates and retains events, triggering a new event based on the combined occurrences. In this case, the relevant events are the processed Warehouse file and the uploaded Sales file.

The requirements for implementing the aggregator pattern include:

  • Correlation: A mechanism for grouping related events, which can be accomplished using a unique identifier in the file name.
  • Event Aggregator: A stateful repository for the events.
  • Completion Check and Trigger: A condition to ascertain when the combined events have been received and a means to publish the resulting event.

Architecture Overview

The architecture employs several AWS services:

  • Amazon DynamoDB as the event aggregator.
  • Step Functions for orchestrating the workflow.
  • AWS Lambda to parse the file name and extract the correlation identifier.
  • AWS Serverless Application Model (AWS SAM) for infrastructure as code and deployment.

The workflow proceeds as follows:

  1. File Upload: Both the Sales and Warehouse teams upload their respective files to S3.
  2. EventBridge: An ObjectCreated event is dispatched to EventBridge, where a rule directs it to the main workflow.
  3. Main State Machine: This orchestrates the aggregator operations and file processing, isolating the aggregator logic from the file workflows.
  4. File Parser and Correlation: A Lambda function executes the business logic to identify the file and its type.
  5. Stateful Store: A DynamoDB table maintains information about the file, including its name, type, and processing status. The state machine interacts with this table to read and write data, including task tokens.
  6. File Processing: Based on the file type and pre-conditions, appropriate state machines handle the processing of each file.
  7. Task Token & Callback: When the dependent file attempts to process before the independent file, a task token is generated. The Step Functions “Wait for a Callback” pattern allows the execution of the dependent file to resume once the independent file has been processed.

Walkthrough

Before you begin, ensure that you have the following prerequisites:

  • AWS CLI and AWS SAM CLI installed.
  • An AWS account.
  • Adequate permissions to manage AWS resources.
  • Git installed.

Refer to the GitHub repository for deployment instructions. This walkthrough illustrates the scenario where the dependent file (Sales file) is uploaded before the independent one (Warehouse file).

The workflow initiates with the Sales file being uploaded to its designated S3 bucket. For the sake of this example, distinct S3 buckets are utilized for the two files, reflecting the autonomy of the Sales and Warehouse teams. Sample files can be found in the code repository.

Upon uploading to S3, an event is sent to EventBridge, which triggers the aggregator state machine. The event pattern configured in the EventBridge rule is:

{
  "detail-type": ["Object Created"],
  "source": ["aws.s3"],
  "detail": {
    "bucket": {
      "name": ["sales-mfu-eda-09092023", "warehouse-mfu-eda-09092023"]
    },
    "reason": ["PutObject"]
  }
}

The aggregator state machine begins by invoking the file parser Lambda function, which identifies the file type and correlates files using the identifier. The file name includes the type and correlation identifier (year_month). You may adjust this function to parse other representations of the file type and correlation identifier.

The state machine then inserts a record for the event into the DynamoDB event aggregator table, which employs a composite primary key with the correlation identifier as the partition key and the file type as the sort key. The processing status is tracked to provide feedback on the workflow state.

Based on the file type, the state machine determines the appropriate branch to execute. In this case, the Sales branch is activated. The state machine queries DynamoDB for the status of the dependent Warehouse file using the correlation identifier to check if it has been processed.

Since the Warehouse file has not yet been processed, the waitForTaskToken integration pattern is employed. The state machine pauses at this point, generating a task token that external services can utilize to resume the state machine’s execution. The Sales record in DynamoDB is updated with the task token.

Next, upload the sample Warehouse file to the Warehouse S3 bucket via the S3 console. This action triggers a new instance of the Step Functions workflow, which follows the alternative branch after the file type decision step. Here, the Warehouse state machine is executed, and the file’s processing status is updated in DynamoDB.

Upon the Warehouse file’s status changing to “Completed,” the Warehouse state machine checks DynamoDB for any pending Sales files. If one exists, it retrieves the task token and invokes the SendTaskSuccess method, resuming the Sales state machine, which was awaiting continuation. The Sales state machine is initiated, and its processing status is updated.

Conclusion

This blog highlights a method for managing file dependencies within event-driven architectures. You can customize the provided sample in the code repository to suit your specific needs. This solution addresses file dependencies specifically within event-driven contexts. For further insights into handling event dependencies and aggregators, check out this blog post about saving to quit your job, which may provide additional context. Similarly, an article from SHRM discusses how pay-for-performance plans can boost employee productivity, which you might find beneficial. For a visual representation of these concepts, this YouTube video serves as an excellent resource.


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *